package com.retry.task.core.server;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.retry.task.core.executor.IExectorAdapter;
import com.retry.task.core.utils.GsonTool;
import com.retry.task.core.utils.HttpClientUtils;
import com.retry.task.core.utils.threadPool.RetryTaskLogRunable;
import com.retry.task.core.utils.threadPool.RetryTaskThreadPoolExector;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
 * @author gao.gwq
 * @version 1.0
 * @date 2022/4/16  23:12
 * @Description TODO
 */
public class RetryTaskServer {
    private static final Logger logger = LoggerFactory.getLogger(RetryTaskServer.class);

    private final IExectorAdapter iExectorAdapter;
    private Thread thread;

    private final int port;
    private final RetryTaskThreadPoolExector bizThreadPool;

    static {

    }

    public RetryTaskServer(RetryTaskThreadPoolExector threadPoolExecutor, IExectorAdapter iExectorAdapter, int port) {

        bizThreadPool = threadPoolExecutor;
        this.iExectorAdapter = iExectorAdapter;
        this.port = port;
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                bizThreadPool.shutdown();
                logger.info("-------------retry-task shutdown netty server  thread pool");
            }
        }));
    }

    public void start() {
        thread = new Thread(new Runnable() {

            @Override
            public void run() {
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();

                try {
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                    .addFirst()
                                    .addLast(
                                        new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                    .addLast(new HttpServerCodec())
                                    .addLast(
                                        new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                    .addLast(new EmbedHttpServerHandler(iExectorAdapter, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> retry task server start success, nettype = {}, port = {}",
                        RetryTaskServer.class, port);
                    future.channel().closeFuture().sync();

                } catch (InterruptedException e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> retry task remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> retry task remoting server error.", e);
                    }
                } finally {
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

        });
        thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }

    public void stop() throws Exception {
        // destroy server thread
        if (thread != null && thread.isAlive()) {
            thread.interrupt();
        }

        logger.info(">>>>>>>>>>> retry task remoting server destroy success.");
    }

    public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

        private IExectorAdapter exectorAdapter;
        private ThreadPoolExecutor bizThreadPool;

        public EmbedHttpServerHandler(IExectorAdapter exectorAdapter, ThreadPoolExecutor bizThreadPool) {
            this.exectorAdapter = exectorAdapter;
            this.bizThreadPool = bizThreadPool;
        }

        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

            String requestData = msg.content().toString(CharsetUtil.UTF_8);
            String uri = msg.uri();
            HttpMethod httpMethod = msg.method();
            boolean keepAlive = HttpUtil.isKeepAlive(msg);
            String traceId = msg.headers().get(HttpClientUtils.TRACE_ID);
            MDC.put("TRACE_ID", traceId);
            try {
                Runnable logRun = new RetryTaskLogRunable(MDC.getCopyOfContextMap(), new Runnable() {
                    @Override
                    public void run() {
                        Object responseObj = exectorAdapter.processRequest(requestData,uri,httpMethod);

                        String responseJson = GsonTool.toJsonStringIgnoreNull(responseObj);

                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
                bizThreadPool.execute(logRun);
            } finally {
                MDC.remove("TRACE_ID");
            }

        }

        /**
         * write response
         */
        private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
            // write response
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,
                "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
            if (keepAlive) {
                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
            }
            ctx.writeAndFlush(response);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            logger.error(">>>>>>>>>>> retry task provider netty_http server caught exception", cause);
            ctx.close();
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.channel().close();      // beat 3N, close if idle
                logger.debug(">>>>>>>>>>> retry task provider netty_http server close an idle channel.");
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

}
